{% load static %}
Objectives:
# Import required libraries
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import math
import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'
%matplotlib inline
# for auto-reloading external modules
# see http://stackoverflow.com/questions/1907993/autoreload-of-modules-in-ipython
%load_ext autoreload
%autoreload 2
Generative models try to model the distribution of data in an explicit way (by learning the joint probability distribution p(x,y)). This is in contrast to discriminative models that try to infer the output directly from the input (by learning the conditional probability p(y|x) or a mapping of the inputs x to outputs y). In other words, generative models describe how data is generated while disriminative models learn the boundaries to discriminate among classes.
GAN stands for Generative Adversarial Network. They are aptly named as such because GANs are essentially made up of two competing networks: a generator and a discriminator. The discriminator is a binary classifier with the two classes being "taken from the real data" ("real") and "generated by the generator" ("fake"). Its objective is to minimize the classification loss. The generator's objective is to generate samples so that the discriminator misclassifies them as real.
However, it is important to note that a good generator does not just learn to replicate the real data or find one perfect fake. The generator needs to be able to generate a variety of fake samples such that when presented as a distribution alongside the distribution of real samples, these two are indistinguishable by the discriminator. In order to generate different samples with a deterministic generator, the generator receives random numbers as the initial input.
Typically, for the discriminator we use binary cross entropy loss with label 1 being real and 0 being fake. For the generator, the input is a random vector drawn from a standard normal distribution. Denote the generator by $G_{\phi}(z)$, discriminator by $D_{\theta}(x)$, the distribution of the real samples by $p(x)$ and the input distribution to the generator by $q(z)$. Recall that the binary cross entropy loss with classifier output $y$ and label $\hat{y}$ is
$$L(y, \hat{y}) = -\hat{y} \log y - (1 - \hat{y}) \log (1 - y)$$For the discriminator, the objective is
$$\min_{\theta} \mathrm{E}_{x \sim p(x)}[L(D_{\theta}(x), 1)] + \mathrm{E}_{z \sim q(z)}[L(D_{\theta}(G_{\phi}(z)), 0)]$$For the generator, the objective is
$$\max_{\phi} \mathrm{E}_{z \sim q(z)}[L(D_{\theta}(G_{\phi}(z)), 0)]$$The generator's objective corresponds to maximizing the classification loss of the discriminator on the generated samples. Alternatively, we can minimize the classification loss of the discriminator on the generated samples when labelled as real:
$$\min_{\phi} \mathrm{E}_{z \sim q(z)}[L(D_{\theta}(G_{\phi}(z)), 1)]$$The strength of the two networks should be balanced, so we train the two networks alternatingly, updating the parameters in both networks once in each interation.
Note: I will be using CIFAR-10 data. You can download it using the following shell script:
wget http://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz
tar -xzvf cifar-10-python.tar.gz
rm cifar-10-python.tar.gz
def unpickle(file):
import sys
if sys.version_info.major == 2:
import cPickle
with open(file, 'rb') as fo:
dict = cPickle.load(fo)
return dict['data'], dict['labels']
else:
import pickle
with open(file, 'rb') as fo:
dict = pickle.load(fo, encoding='bytes')
return dict[b'data'], dict[b'labels']
def load_train_data():
X = []
for i in range(5):
X_, _ = unpickle('data/cifar-10-batches-py/data_batch_%d' % (i + 1))
X.append(X_)
X = np.concatenate(X)
X = X.reshape((X.shape[0], 3, 32, 32)).transpose(0, 2, 3, 1)
return X
def load_test_data():
X_, _ = unpickle('data/cifar-10-batches-py/test_batch')
X = X_.reshape((X_.shape[0], 3, 32, 32)).transpose(0, 2, 3, 1)
return X
# Load cifar-10 data
train_samples = load_train_data() / 255.0
test_samples = load_test_data() / 255.0
def viz_grid(Xs, padding):
N, H, W, C = Xs.shape
grid_size = int(math.ceil(math.sqrt(N)))
grid_height = H * grid_size + padding * (grid_size + 1)
grid_width = W * grid_size + padding * (grid_size + 1)
grid = np.zeros((grid_height, grid_width, C))
next_idx = 0
y0, y1 = padding, H + padding
for y in range(grid_size):
x0, x1 = padding, W + padding
for x in range(grid_size):
if next_idx < N:
img = Xs[next_idx]
grid[y0:y1, x0:x1] = img
next_idx += 1
x0 += W + padding
x1 += W + padding
y0 += H + padding
y1 += H + padding
return grid
def set_seed(seed):
np.random.seed(seed)
tf.set_random_seed(seed)
def conv2d(input, kernel_size, stride, num_filter, name = 'conv2d'):
with tf.variable_scope(name):
stride_shape = [1, stride, stride, 1]
filter_shape = [kernel_size, kernel_size, input.get_shape()[3], num_filter]
W = tf.get_variable('w', filter_shape, tf.float32, tf.random_normal_initializer(0.0, 0.02))
b = tf.get_variable('b', [1, 1, 1, num_filter], initializer = tf.constant_initializer(0.0))
return tf.nn.conv2d(input, W, stride_shape, padding = 'SAME') + b
def conv2d_transpose(input, kernel_size, stride, num_filter, name = 'conv2d_transpose'):
with tf.variable_scope(name):
stride_shape = [1, stride, stride, 1]
filter_shape = [kernel_size, kernel_size, num_filter, input.get_shape()[3]]
output_shape = tf.stack([tf.shape(input)[0], tf.shape(input)[1] * 2, tf.shape(input)[2] * 2, num_filter])
W = tf.get_variable('w', filter_shape, tf.float32, tf.random_normal_initializer(0.0, 0.02))
b = tf.get_variable('b', [1, 1, 1, num_filter], initializer = tf.constant_initializer(0.0))
return tf.nn.conv2d_transpose(input, W, output_shape, stride_shape, padding = 'SAME') + b
def fc(input, num_output, name = 'fc'):
with tf.variable_scope(name):
num_input = input.get_shape()[1]
W = tf.get_variable('w', [num_input, num_output], tf.float32, tf.random_normal_initializer(0.0, 0.02))
b = tf.get_variable('b', [num_output], initializer = tf.constant_initializer(0.0))
return tf.matmul(input, W) + b
def batch_norm(input, is_training):
out = tf.contrib.layers.batch_norm(input, decay = 0.99, center = True, scale = True,
is_training = is_training, updates_collections = None)
return out
def leaky_relu(input, alpha = 0.2):
return tf.maximum(alpha * input, input)
_discriminator() and _generator() define the discriminatosr and generator_init_ops() builds the computational graph for the lossesfake_samples_op: generate famples from noisedis_loss_op: compute discriminator's loss, with real samples from real_input and fake
samples generated by the generatorgen_loss_op: compute generator's lossdis_train_op and gen_train_op define the optimization functions:
The batch normalization layers should operate in training mode. As per How to Train a GAN? Tips and tricks to make GANs work, real samples and fake samples are put in different batches when training the discriminator.
class DCGAN(object):
def __init__(self):
self.num_epoch = 25
self.batch_size = 32
self.log_step = 50
self.visualize_step = 200
self.code_size = 64
self.learning_rate = 1e-4
self.vis_learning_rate = 1e-2
self.recon_steps = 100
self.actmax_steps = 100
self._dis_called = False
self._gen_called = False
self.tracked_noise = np.random.normal(0, 1, [64, self.code_size])
self.real_input = tf.placeholder(tf.float32, [None, 32, 32, 3])
self.real_label = tf.placeholder(tf.float32, [None, 1])
self.fake_label = tf.placeholder(tf.float32, [None, 1])
self.noise = tf.placeholder(tf.float32, [None, self.code_size])
self.is_train = tf.placeholder(tf.bool)
self.recon_sample = tf.placeholder(tf.float32, [1, 32, 32, 3])
self.actmax_label = tf.placeholder(tf.float32, [1, 1])
with tf.variable_scope('actmax'):
self.actmax_code = tf.get_variable('actmax_code', [1, self.code_size],
initializer = tf.constant_initializer(0.0))
self._init_ops()
def _discriminator(self, input):
# We have multiple instances of the discriminator in the same computation graph,
# so set variable sharing if this is not the first invocation of this function.
with tf.variable_scope('dis', reuse = self._dis_called):
self._dis_called = True
dis_conv1 = conv2d(input, 4, 2, 32, 'conv1')
dis_lrelu1 = leaky_relu(dis_conv1)
dis_conv2 = conv2d(dis_lrelu1, 4, 2, 64, 'conv2')
dis_batchnorm2 = batch_norm(dis_conv2, self.is_train)
dis_lrelu2 = leaky_relu(dis_batchnorm2)
dis_conv3 = conv2d(dis_lrelu2, 4, 2, 128, 'conv3')
dis_batchnorm3 = batch_norm(dis_conv3, self.is_train)
dis_lrelu3 = leaky_relu(dis_batchnorm3)
dis_reshape3 = tf.reshape(dis_lrelu3, [-1, 4 * 4 * 128])
dis_fc4 = fc(dis_reshape3, 1, 'fc4')
return dis_fc4
def _generator(self, input):
with tf.variable_scope('gen', reuse = self._gen_called):
self._gen_called = True
gen_fc1 = fc(input, 4 * 4 * 128, 'fc1')
gen_reshape1 = tf.reshape(gen_fc1, [-1, 4, 4, 128])
gen_batchnorm1 = batch_norm(gen_reshape1, self.is_train)
gen_lrelu1 = leaky_relu(gen_batchnorm1)
gen_conv2 = conv2d_transpose(gen_lrelu1, 4, 2, 64, 'conv2')
gen_batchnorm2 = batch_norm(gen_conv2, self.is_train)
gen_lrelu2 = leaky_relu(gen_batchnorm2)
gen_conv3 = conv2d_transpose(gen_lrelu2, 4, 2, 32, 'conv3')
gen_batchnorm3 = batch_norm(gen_conv3, self.is_train)
gen_lrelu3 = leaky_relu(gen_batchnorm3)
gen_conv4 = conv2d_transpose(gen_lrelu3, 4, 2, 3, 'conv4')
gen_sigmoid4 = tf.sigmoid(gen_conv4)
return gen_sigmoid4
def _loss(self, labels, logits):
loss = tf.nn.sigmoid_cross_entropy_with_logits(labels = labels, logits = logits)
return tf.reduce_mean(loss)
def _reconstruction_loss(self, generated, target):
loss = tf.nn.l2_loss(generated - target)
return tf.reduce_mean(loss)
# Define operations
def _init_ops(self):
# generate famples from noise
self.fake_samples_op = self._generator(self.noise)
# compute discriminator's loss, with real samples from real_input and fake samples generated by the generator
self.dis_loss_op=self._loss(self.real_label, self._discriminator(self.real_input)) + self._loss(self.fake_label,self._discriminator(self.fake_samples_op))
#compute generator's loss
self.gen_loss_op= self._loss(self.real_label,self._discriminator(self.fake_samples_op))
dis_optimizer = tf.train.RMSPropOptimizer(self.learning_rate)
dis_train_vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES,
"dis")
self.dis_train_op = dis_optimizer.minimize(self.dis_loss_op, var_list=dis_train_vars)
gen_optimizer = tf.train.RMSPropOptimizer(self.learning_rate)
gen_train_vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES,
"gen")
self.gen_train_op = gen_optimizer.minimize(self.gen_loss_op, var_list=gen_train_vars)
self.actmax_sample_op = self._generator(self.actmax_code)
actmax_dis = self._discriminator(self.actmax_sample_op)
self.actmax_loss_op = self._loss(self.actmax_label, actmax_dis)
actmax_optimizer = tf.train.AdamOptimizer(self.vis_learning_rate)
self.actmax_op = actmax_optimizer.minimize(self.actmax_loss_op, var_list = [self.actmax_code])
self.recon_loss_op = self._reconstruction_loss(self.actmax_sample_op, self.recon_sample) #generated, target
recon_train_vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES,"actmax")
recon_optimizer = tf.train.AdamOptimizer(self.vis_learning_rate)
self.reconstruct_op = recon_optimizer.minimize(self.recon_loss_op,var_list = [self.actmax_code])
# Training function
def train(self, sess, train_samples):
sess.run(tf.global_variables_initializer())
num_train = train_samples.shape[0]
step = 0
# smooth the loss curve so that it does not fluctuate too much
smooth_factor = 0.95
plot_dis_s = 0
plot_gen_s = 0
plot_ws = 0
dis_losses = []
gen_losses = []
max_steps = int(self.num_epoch * (num_train // self.batch_size))
print('Start training ...')
for epoch in range(self.num_epoch):
for i in range(num_train // self.batch_size):
step += 1
batch_samples = train_samples[i * self.batch_size : (i + 1) * self.batch_size]
noise = np.random.normal(0, 1, [self.batch_size, self.code_size])
zeros = np.zeros([self.batch_size, 1])
ones = np.ones([self.batch_size, 1])
# feed dictionary
dis_feed_dict = {self.is_train: self._dis_called, self.real_input:batch_samples, self.noise:noise, self.real_label: ones, self.fake_label:zeros}
_, dis_loss = sess.run([self.dis_train_op, self.dis_loss_op], feed_dict = dis_feed_dict)
gen_feed_dict = {self.is_train: self._gen_called, self.real_input:batch_samples, self.noise:noise, self.real_label: ones, self.fake_label:zeros}
_, gen_loss = sess.run([self.gen_train_op, self.gen_loss_op], feed_dict = gen_feed_dict)
plot_dis_s = plot_dis_s * smooth_factor + dis_loss * (1 - smooth_factor)
plot_gen_s = plot_gen_s * smooth_factor + gen_loss * (1 - smooth_factor)
plot_ws = plot_ws * smooth_factor + (1 - smooth_factor)
dis_losses.append(plot_dis_s / plot_ws)
gen_losses.append(plot_gen_s / plot_ws)
if step % self.log_step == 0:
print('Iteration {0}/{1}: dis loss = {2:.4f}, gen loss = {3:.4f}'.format(step, max_steps, dis_loss, gen_loss))
fig = plt.figure(figsize = (8, 8))
ax1 = plt.subplot(111)
ax1.imshow(viz_grid(self.generate(self.tracked_noise), 1))
plt.show()
plt.plot(dis_losses)
plt.title('discriminator loss')
plt.xlabel('iterations')
plt.ylabel('loss')
plt.show()
plt.plot(gen_losses)
plt.title('generator loss')
plt.xlabel('iterations')
plt.ylabel('loss')
plt.show()
print('... Done!')
# Find the reconstruction of one input sample
def reconstruct_one_sample(self, sample):
# Initialize
actmax_init_val = np.zeros([1,self.code_size])
sess.run(self.actmax_code.assign(actmax_init_val))
last_reconstruction = None
last_loss = None
for i in range(self.recon_steps):
# Feed dict
recon_feed_dict = { self.recon_sample: sample, self.is_train: False}
run_ops = [self.recon_loss_op, self.reconstruct_op, self.actmax_sample_op]
last_loss, _, last_reconstruction = sess.run(run_ops, feed_dict = recon_feed_dict)
return last_loss, last_reconstruction
# Find the reconstruction of a batch of samples
def reconstruct(self, samples):
reconstructions = np.zeros(samples.shape)
total_loss = 0
for i in range(samples.shape[0]):
loss, reconstructions[i:i+1] = self.reconstruct_one_sample(samples[i:i+1])
print(loss)
total_loss += loss
return total_loss / samples.shape[0], reconstructions
# Generates a single sample from input code
def generate_one_sample(self, code):
# Feed dict
gen_vis_feed_dict = {self.noise:code, self.is_train: self._gen_called}
generated = sess.run(self.fake_samples_op, feed_dict = gen_vis_feed_dict)
return generated
# Generates samples from input batch of codes
def generate(self, codes):
generated = np.zeros((codes.shape[0], 32, 32, 3))
for i in range(codes.shape[0]):
generated[i:i+1] = self.generate_one_sample(codes[i:i+1])
return generated
# Perform activation maximization on one initial code
def actmax_one_sample(self, initial_code):
actmax_init_val = tf.convert_to_tensor(initial_code, dtype = tf.float32)
sess.run(self.actmax_code.assign(actmax_init_val))
for i in range(self.actmax_steps):
actmax_feed_dict = {self.actmax_label: np.ones([1, 1]),self.is_train: False}
_, last_actmax = sess.run([self.actmax_op, self.actmax_sample_op], feed_dict = actmax_feed_dict)
return last_actmax
# Perform activation maximization on a batch of different initial codes
def actmax(self, initial_codes):
actmax_results = np.zeros((initial_codes.shape[0], 32, 32, 3))
for i in range(initial_codes.shape[0]):
actmax_results[i:i+1] = self.actmax_one_sample(initial_codes[i:i+1])
return actmax_results.clip(0, 1)
tf.reset_default_graph()
set_seed(21)
with tf.Session() as sess:
with tf.device('/cpu:0'):
dcgan = DCGAN()
sess.run(tf.global_variables_initializer())
dcgan.train(sess, train_samples)
dis_var_list = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, 'dis')
gen_var_list = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, 'gen')
saver = tf.train.Saver(dis_var_list + gen_var_list)
saver.save(sess, 'model/dcgan')
GANs notoriously difficult to debug. Here we provide one of methods that are designed for investigating what networks learn, which can be helpful for debugging GANs.
Activation Maximization is a visualization technique to see what a particular neuron has learned, by finding the input that maximizes the activation of that neuron. Here we use methods similar to Synthesizing the preferred inputs for neurons in neural networks via deep generator networks.
In short, what we want to do is to find the samples that the discriminator considers most real, among all possible outputs of the generator, which is to say, we want to find the codes (i.e. a point in the input space of the generator) from which the generated images, if labelled as real, would minimize the classification loss of the discriminator:
$$\min_{z} L(D_{\theta}(G_{\phi}(z)), 1)$$Compare this to the objective when we were training the generator:
$$\min_{\phi} \mathrm{E}_{z \sim q(z)}[L(D_{\theta}(G_{\phi}(z)), 1)]$$The function to minimize is the same, with the difference being that when training the network we fix a set of input data and find the optimal model parameters, while in activation maximization we fix the model parameters and find the optimal input.
So, similar to the training, we use gradient descent to solve for the optimal input. Starting from a random code drawn from a standard normal distribution, we perform a fixed step of Adam optimization algorithm on the code.
The batch normalization layers should work in evaluation mode.
tf.reset_default_graph()
set_seed(241)
with tf.Session() as sess:
with tf.device('/cpu:0'):
dcgan = DCGAN()
sess.run(tf.global_variables_initializer())
dis_var_list = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, 'dis')
gen_var_list = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, 'gen')
saver = tf.train.Saver(dis_var_list + gen_var_list)
saver.restore(sess, 'model/dcgan')
actmax_results = dcgan.actmax(np.random.random([64, dcgan.code_size]))
fig = plt.figure(figsize = (8, 8))
ax1 = plt.subplot(111)
ax1.imshow(viz_grid(actmax_results, 1))
plt.show()
The output should have less variety than those generated from random code. While it is reasonable that the samples that are "most real" makes up only a small portion of the sample space, this also gives us a hint that the so-called "mode collapse", in which the GAN simply fails to model a majority part of the data distribution, is a real problem.
A similar technique can be used to reconstruct a test sample, that is, to find the code that most closely approximates the test sample. To achieve this, we only need to change the loss function from discriminator's loss to the squared L2-distance between the generated image and the target image:
$$\min_{z} \left|\left|G_{\phi}(z)-x\right|\right|_2^2$$This time, we always start from a zero vector.
tf.reset_default_graph()
with tf.Session() as sess:
with tf.device('/cpu:0'):
dcgan = DCGAN()
sess.run(tf.global_variables_initializer())
dis_var_list = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, 'dis')
gen_var_list = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, 'gen')
saver = tf.train.Saver(dis_var_list + gen_var_list)
saver.restore(sess, 'model/dcgan')
avg_loss, reconstructions = dcgan.reconstruct(test_samples[0:64])
print('average reconstruction loss = {0:.4f}'.format(avg_loss))
fig = plt.figure(figsize = (16, 16))
ax1 = plt.subplot(111)
ax1.imshow(viz_grid(np.concatenate((test_samples[0:64], reconstructions), axis=2), 1))
plt.show()